mirror of
https://github.com/cwinfo/yggdrasil-go.git
synced 2024-11-22 20:00:27 +00:00
commit
fc5a5830aa
@ -300,6 +300,7 @@ func pingNodes(store map[[32]byte]*Node) {
|
|||||||
}
|
}
|
||||||
case <-ch:
|
case <-ch:
|
||||||
sendTo(payload, destAddr)
|
sendTo(payload, destAddr)
|
||||||
|
//dumpDHTSize(store) // note that this uses racey functions to read things...
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ticker.Stop()
|
ticker.Stop()
|
||||||
@ -386,7 +387,7 @@ func (n *Node) startTCP(listen string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) connectTCP(remoteAddr string) {
|
func (n *Node) connectTCP(remoteAddr string) {
|
||||||
n.core.AddPeer(remoteAddr)
|
n.core.AddPeer(remoteAddr, remoteAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
@ -437,7 +438,7 @@ func main() {
|
|||||||
pingNodes(kstore)
|
pingNodes(kstore)
|
||||||
//pingBench(kstore) // Only after disabling debug output
|
//pingBench(kstore) // Only after disabling debug output
|
||||||
//stressTest(kstore)
|
//stressTest(kstore)
|
||||||
//time.Sleep(120*time.Second)
|
time.Sleep(120 * time.Second)
|
||||||
dumpDHTSize(kstore) // note that this uses racey functions to read things...
|
dumpDHTSize(kstore) // note that this uses racey functions to read things...
|
||||||
if false {
|
if false {
|
||||||
// This connects the sim to the local network
|
// This connects the sim to the local network
|
||||||
|
@ -556,25 +556,23 @@ func (a *admin) getData_getSwitchQueues() admin_nodeInfo {
|
|||||||
// getData_getDHT returns info from Core.dht for an admin response.
|
// getData_getDHT returns info from Core.dht for an admin response.
|
||||||
func (a *admin) getData_getDHT() []admin_nodeInfo {
|
func (a *admin) getData_getDHT() []admin_nodeInfo {
|
||||||
var infos []admin_nodeInfo
|
var infos []admin_nodeInfo
|
||||||
now := time.Now()
|
|
||||||
getDHT := func() {
|
getDHT := func() {
|
||||||
for i := 0; i < a.core.dht.nBuckets(); i++ {
|
now := time.Now()
|
||||||
b := a.core.dht.getBucket(i)
|
var dhtInfos []*dhtInfo
|
||||||
getInfo := func(vs []*dhtInfo, isPeer bool) {
|
for _, v := range a.core.dht.table {
|
||||||
for _, v := range vs {
|
dhtInfos = append(dhtInfos, v)
|
||||||
addr := *address_addrForNodeID(v.getNodeID())
|
}
|
||||||
info := admin_nodeInfo{
|
sort.SliceStable(dhtInfos, func(i, j int) bool {
|
||||||
{"ip", net.IP(addr[:]).String()},
|
return dht_ordered(&a.core.dht.nodeID, dhtInfos[i].getNodeID(), dhtInfos[j].getNodeID())
|
||||||
{"coords", fmt.Sprint(v.coords)},
|
})
|
||||||
{"bucket", i},
|
for _, v := range dhtInfos {
|
||||||
{"peer_only", isPeer},
|
addr := *address_addrForNodeID(v.getNodeID())
|
||||||
{"last_seen", int(now.Sub(v.recv).Seconds())},
|
info := admin_nodeInfo{
|
||||||
}
|
{"ip", net.IP(addr[:]).String()},
|
||||||
infos = append(infos, info)
|
{"coords", fmt.Sprint(v.coords)},
|
||||||
}
|
{"last_seen", int(now.Sub(v.recv).Seconds())},
|
||||||
}
|
}
|
||||||
getInfo(b.other, false)
|
infos = append(infos, info)
|
||||||
getInfo(b.peers, true)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
a.core.router.doAdmin(getDHT)
|
a.core.router.doAdmin(getDHT)
|
||||||
|
@ -229,12 +229,10 @@ func DEBUG_wire_encode_coords(coords []byte) []byte {
|
|||||||
// DHT, via core
|
// DHT, via core
|
||||||
|
|
||||||
func (c *Core) DEBUG_getDHTSize() int {
|
func (c *Core) DEBUG_getDHTSize() int {
|
||||||
total := 0
|
var total int
|
||||||
for bidx := 0; bidx < c.dht.nBuckets(); bidx++ {
|
c.router.doAdmin(func() {
|
||||||
b := c.dht.getBucket(bidx)
|
total = len(c.dht.table)
|
||||||
total += len(b.peers)
|
})
|
||||||
total += len(b.other)
|
|
||||||
}
|
|
||||||
return total
|
return total
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,38 +1,15 @@
|
|||||||
package yggdrasil
|
package yggdrasil
|
||||||
|
|
||||||
/*
|
// A chord-like Distributed Hash Table (DHT).
|
||||||
|
// Used to look up coords given a NodeID and bitmask (taken from an IPv6 address).
|
||||||
This part has the (kademlia-like) distributed hash table
|
// Keeps track of immediate successor, predecessor, and all peers.
|
||||||
|
// Also keeps track of other nodes if they're closer in tree space than all other known nodes encountered when heading in either direction to that point, under the hypothesis that, for the kinds of networks we care about, this should probabilistically include the node needed to keep lookups to near O(logn) steps.
|
||||||
It's used to look up coords for a NodeID
|
|
||||||
|
|
||||||
Every node participates in the DHT, and the DHT stores no real keys/values
|
|
||||||
(Only the peer relationships / lookups are needed)
|
|
||||||
|
|
||||||
This version is intentionally fragile, by being recursive instead of iterative
|
|
||||||
(it's also not parallel, as a result)
|
|
||||||
This is to make sure that DHT black holes are visible if they exist
|
|
||||||
(the iterative parallel approach tends to get around them sometimes)
|
|
||||||
I haven't seen this get stuck on blackholes, but I also haven't proven it can't
|
|
||||||
Slight changes *do* make it blackhole hard, bootstrapping isn't an easy problem
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Number of DHT buckets, equal to the number of bits in a NodeID.
|
|
||||||
// Note that, in practice, nearly all of these will be empty.
|
|
||||||
const dht_bucket_number = 8 * NodeIDLen
|
|
||||||
|
|
||||||
// Number of nodes to keep in each DHT bucket.
|
|
||||||
// Additional entries may be kept for peers, for bootstrapping reasons, if they don't already have an entry in the bucket.
|
|
||||||
const dht_bucket_size = 2
|
|
||||||
|
|
||||||
// Number of responses to include in a lookup.
|
|
||||||
// If extras are given, they will be truncated from the response handler to prevent abuse.
|
|
||||||
const dht_lookup_size = 16
|
const dht_lookup_size = 16
|
||||||
|
|
||||||
// dhtInfo represents everything we know about a node in the DHT.
|
// dhtInfo represents everything we know about a node in the DHT.
|
||||||
@ -41,11 +18,9 @@ type dhtInfo struct {
|
|||||||
nodeID_hidden *NodeID
|
nodeID_hidden *NodeID
|
||||||
key boxPubKey
|
key boxPubKey
|
||||||
coords []byte
|
coords []byte
|
||||||
send time.Time // When we last sent a message
|
recv time.Time // When we last received a message
|
||||||
recv time.Time // When we last received a message
|
pings int // Time out if at least 3 consecutive maintenance pings drop
|
||||||
pings int // Decide when to drop
|
throttle time.Duration
|
||||||
throttle time.Duration // Time to wait before pinging a node to bootstrap buckets, increases exponentially from 1 second to 1 minute
|
|
||||||
bootstrapSend time.Time // The time checked/updated as part of throttle checks
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the *NodeID associated with dhtInfo.key, calculating it on the fly the first time or from a cache all subsequent times.
|
// Returns the *NodeID associated with dhtInfo.key, calculating it on the fly the first time or from a cache all subsequent times.
|
||||||
@ -56,12 +31,6 @@ func (info *dhtInfo) getNodeID() *NodeID {
|
|||||||
return info.nodeID_hidden
|
return info.nodeID_hidden
|
||||||
}
|
}
|
||||||
|
|
||||||
// The nodes we known in a bucket (a region of keyspace with a matching prefix of some length).
|
|
||||||
type bucket struct {
|
|
||||||
peers []*dhtInfo
|
|
||||||
other []*dhtInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
// Request for a node to do a lookup.
|
// Request for a node to do a lookup.
|
||||||
// Includes our key and coords so they can send a response back, and the destination NodeID we want to ask about.
|
// Includes our key and coords so they can send a response back, and the destination NodeID we want to ask about.
|
||||||
type dhtReq struct {
|
type dhtReq struct {
|
||||||
@ -74,30 +43,21 @@ type dhtReq struct {
|
|||||||
// Includes the key and coords of the node that's responding, and the destination they were asked about.
|
// Includes the key and coords of the node that's responding, and the destination they were asked about.
|
||||||
// The main part is Infos []*dhtInfo, the lookup response.
|
// The main part is Infos []*dhtInfo, the lookup response.
|
||||||
type dhtRes struct {
|
type dhtRes struct {
|
||||||
Key boxPubKey // key to respond to
|
Key boxPubKey // key of the sender
|
||||||
Coords []byte // coords to respond to
|
Coords []byte // coords of the sender
|
||||||
Dest NodeID
|
Dest NodeID
|
||||||
Infos []*dhtInfo // response
|
Infos []*dhtInfo // response
|
||||||
}
|
}
|
||||||
|
|
||||||
// Information about a node, either taken from our table or from a lookup response.
|
|
||||||
// Used to schedule pings at a later time (they're throttled to 1/second for background maintenance traffic).
|
|
||||||
type dht_rumor struct {
|
|
||||||
info *dhtInfo
|
|
||||||
target *NodeID
|
|
||||||
}
|
|
||||||
|
|
||||||
// The main DHT struct.
|
// The main DHT struct.
|
||||||
// Includes a slice of buckets, to organize known nodes based on their region of keyspace.
|
|
||||||
// Also includes information about outstanding DHT requests and the rumor mill of nodes to ping at some point.
|
|
||||||
type dht struct {
|
type dht struct {
|
||||||
core *Core
|
core *Core
|
||||||
nodeID NodeID
|
nodeID NodeID
|
||||||
buckets_hidden [dht_bucket_number]bucket // Extra is for the self-bucket
|
peers chan *dhtInfo // other goroutines put incoming dht updates here
|
||||||
peers chan *dhtInfo // other goroutines put incoming dht updates here
|
reqs map[boxPubKey]map[NodeID]time.Time
|
||||||
reqs map[boxPubKey]map[NodeID]time.Time
|
// These next two could be replaced by a single linked list or similar...
|
||||||
offset int
|
table map[NodeID]*dhtInfo
|
||||||
rumorMill []dht_rumor
|
imp []*dhtInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initializes the DHT.
|
// Initializes the DHT.
|
||||||
@ -105,11 +65,92 @@ func (t *dht) init(c *Core) {
|
|||||||
t.core = c
|
t.core = c
|
||||||
t.nodeID = *t.core.GetNodeID()
|
t.nodeID = *t.core.GetNodeID()
|
||||||
t.peers = make(chan *dhtInfo, 1024)
|
t.peers = make(chan *dhtInfo, 1024)
|
||||||
|
t.reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resets the DHT in response to coord changes.
|
||||||
|
// This empties all info from the DHT and drops outstanding requests.
|
||||||
|
func (t *dht) reset() {
|
||||||
t.reqs = make(map[boxPubKey]map[NodeID]time.Time)
|
t.reqs = make(map[boxPubKey]map[NodeID]time.Time)
|
||||||
|
t.table = make(map[NodeID]*dhtInfo)
|
||||||
|
t.imp = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Does a DHT lookup and returns up to dht_lookup_size results.
|
||||||
|
func (t *dht) lookup(nodeID *NodeID, everything bool) []*dhtInfo {
|
||||||
|
results := make([]*dhtInfo, 0, len(t.table))
|
||||||
|
for _, info := range t.table {
|
||||||
|
results = append(results, info)
|
||||||
|
}
|
||||||
|
sort.SliceStable(results, func(i, j int) bool {
|
||||||
|
return dht_ordered(nodeID, results[i].getNodeID(), results[j].getNodeID())
|
||||||
|
})
|
||||||
|
if len(results) > dht_lookup_size {
|
||||||
|
results = results[:dht_lookup_size]
|
||||||
|
}
|
||||||
|
return results
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert into table, preserving the time we last sent a packet if the node was already in the table, otherwise setting that time to now.
|
||||||
|
func (t *dht) insert(info *dhtInfo) {
|
||||||
|
if *info.getNodeID() == t.nodeID {
|
||||||
|
// This shouldn't happen, but don't add it if it does
|
||||||
|
return
|
||||||
|
panic("FIXME")
|
||||||
|
}
|
||||||
|
info.recv = time.Now()
|
||||||
|
if oldInfo, isIn := t.table[*info.getNodeID()]; isIn {
|
||||||
|
sameCoords := true
|
||||||
|
if len(info.coords) != len(oldInfo.coords) {
|
||||||
|
sameCoords = false
|
||||||
|
} else {
|
||||||
|
for idx := 0; idx < len(info.coords); idx++ {
|
||||||
|
if info.coords[idx] != oldInfo.coords[idx] {
|
||||||
|
sameCoords = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if sameCoords {
|
||||||
|
info.throttle = oldInfo.throttle
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.imp = nil // It needs to update to get a pointer to the new info
|
||||||
|
t.table[*info.getNodeID()] = info
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return true if first/second/third are (partially) ordered correctly.
|
||||||
|
func dht_ordered(first, second, third *NodeID) bool {
|
||||||
|
lessOrEqual := func(first, second *NodeID) bool {
|
||||||
|
for idx := 0; idx < NodeIDLen; idx++ {
|
||||||
|
if first[idx] > second[idx] {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if first[idx] < second[idx] {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
firstLessThanSecond := lessOrEqual(first, second)
|
||||||
|
secondLessThanThird := lessOrEqual(second, third)
|
||||||
|
thirdLessThanFirst := lessOrEqual(third, first)
|
||||||
|
switch {
|
||||||
|
case firstLessThanSecond && secondLessThanThird:
|
||||||
|
// Nothing wrapped around 0, the easy case
|
||||||
|
return true
|
||||||
|
case thirdLessThanFirst && firstLessThanSecond:
|
||||||
|
// Third wrapped around 0
|
||||||
|
return true
|
||||||
|
case secondLessThanThird && thirdLessThanFirst:
|
||||||
|
// Second (and third) wrapped around 0
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reads a request, performs a lookup, and responds.
|
// Reads a request, performs a lookup, and responds.
|
||||||
// If the node that sent the request isn't in our DHT, but should be, then we add them.
|
// Update info about the node that sent the request.
|
||||||
func (t *dht) handleReq(req *dhtReq) {
|
func (t *dht) handleReq(req *dhtReq) {
|
||||||
// Send them what they asked for
|
// Send them what they asked for
|
||||||
loc := t.core.switchTable.getLocator()
|
loc := t.core.switchTable.getLocator()
|
||||||
@ -121,19 +162,35 @@ func (t *dht) handleReq(req *dhtReq) {
|
|||||||
Infos: t.lookup(&req.Dest, false),
|
Infos: t.lookup(&req.Dest, false),
|
||||||
}
|
}
|
||||||
t.sendRes(&res, req)
|
t.sendRes(&res, req)
|
||||||
// Also (possibly) add them to our DHT
|
// Also add them to our DHT
|
||||||
info := dhtInfo{
|
info := dhtInfo{
|
||||||
key: req.Key,
|
key: req.Key,
|
||||||
coords: req.Coords,
|
coords: req.Coords,
|
||||||
}
|
}
|
||||||
// For bootstrapping to work, we need to add these nodes to the table
|
if _, isIn := t.table[*info.getNodeID()]; !isIn && t.isImportant(&info) {
|
||||||
// Using insertIfNew, they can lie about coords, but searches will route around them
|
t.insert(&info)
|
||||||
// Using the mill would mean trying to block off the mill becomes an attack vector
|
}
|
||||||
t.insertIfNew(&info, false)
|
}
|
||||||
|
|
||||||
|
// Sends a lookup response to the specified node.
|
||||||
|
func (t *dht) sendRes(res *dhtRes, req *dhtReq) {
|
||||||
|
// Send a reply for a dhtReq
|
||||||
|
bs := res.encode()
|
||||||
|
shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &req.Key)
|
||||||
|
payload, nonce := boxSeal(shared, bs, nil)
|
||||||
|
p := wire_protoTrafficPacket{
|
||||||
|
Coords: req.Coords,
|
||||||
|
ToKey: req.Key,
|
||||||
|
FromKey: t.core.boxPub,
|
||||||
|
Nonce: *nonce,
|
||||||
|
Payload: payload,
|
||||||
|
}
|
||||||
|
packet := p.encode()
|
||||||
|
t.core.router.out(packet)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reads a lookup response, checks that we had sent a matching request, and processes the response info.
|
// Reads a lookup response, checks that we had sent a matching request, and processes the response info.
|
||||||
// This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and adding the response info to the rumor mill.
|
// This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses
|
||||||
func (t *dht) handleRes(res *dhtRes) {
|
func (t *dht) handleRes(res *dhtRes) {
|
||||||
t.core.searches.handleDHTRes(res)
|
t.core.searches.handleDHTRes(res)
|
||||||
reqs, isIn := t.reqs[res.Key]
|
reqs, isIn := t.reqs[res.Key]
|
||||||
@ -145,223 +202,25 @@ func (t *dht) handleRes(res *dhtRes) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
delete(reqs, res.Dest)
|
delete(reqs, res.Dest)
|
||||||
now := time.Now()
|
|
||||||
rinfo := dhtInfo{
|
rinfo := dhtInfo{
|
||||||
key: res.Key,
|
key: res.Key,
|
||||||
coords: res.Coords,
|
coords: res.Coords,
|
||||||
send: now, // Technically wrong but should be OK...
|
|
||||||
recv: now,
|
|
||||||
throttle: time.Second,
|
|
||||||
bootstrapSend: now,
|
|
||||||
}
|
}
|
||||||
// If they're already in the table, then keep the correct send time
|
if t.isImportant(&rinfo) {
|
||||||
bidx, isOK := t.getBucketIndex(rinfo.getNodeID())
|
t.insert(&rinfo)
|
||||||
if !isOK {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
b := t.getBucket(bidx)
|
|
||||||
for _, oldinfo := range b.peers {
|
|
||||||
if oldinfo.key == rinfo.key {
|
|
||||||
rinfo.send = oldinfo.send
|
|
||||||
rinfo.throttle = oldinfo.throttle
|
|
||||||
rinfo.bootstrapSend = oldinfo.bootstrapSend
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, oldinfo := range b.other {
|
|
||||||
if oldinfo.key == rinfo.key {
|
|
||||||
rinfo.send = oldinfo.send
|
|
||||||
rinfo.throttle = oldinfo.throttle
|
|
||||||
rinfo.bootstrapSend = oldinfo.bootstrapSend
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Insert into table
|
|
||||||
t.insert(&rinfo, false)
|
|
||||||
if res.Dest == *rinfo.getNodeID() {
|
|
||||||
return
|
|
||||||
} // No infinite recursions
|
|
||||||
if len(res.Infos) > dht_lookup_size {
|
|
||||||
// Ignore any "extra" lookup results
|
|
||||||
res.Infos = res.Infos[:dht_lookup_size]
|
|
||||||
}
|
}
|
||||||
for _, info := range res.Infos {
|
for _, info := range res.Infos {
|
||||||
if dht_firstCloserThanThird(info.getNodeID(), &res.Dest, rinfo.getNodeID()) {
|
if *info.getNodeID() == t.nodeID {
|
||||||
t.addToMill(info, info.getNodeID())
|
continue
|
||||||
}
|
} // Skip self
|
||||||
}
|
if _, isIn := t.table[*info.getNodeID()]; isIn {
|
||||||
}
|
// TODO? don't skip if coords are different?
|
||||||
|
|
||||||
// Does a DHT lookup and returns the results, sorted in ascending order of distance from the destination.
|
|
||||||
func (t *dht) lookup(nodeID *NodeID, allowCloser bool) []*dhtInfo {
|
|
||||||
// FIXME this allocates a bunch, sorts, and keeps the part it likes
|
|
||||||
// It would be better to only track the part it likes to begin with
|
|
||||||
addInfos := func(res []*dhtInfo, infos []*dhtInfo) []*dhtInfo {
|
|
||||||
for _, info := range infos {
|
|
||||||
if info == nil {
|
|
||||||
panic("Should never happen!")
|
|
||||||
}
|
|
||||||
if allowCloser || dht_firstCloserThanThird(info.getNodeID(), nodeID, &t.nodeID) {
|
|
||||||
res = append(res, info)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
var res []*dhtInfo
|
|
||||||
for bidx := 0; bidx < t.nBuckets(); bidx++ {
|
|
||||||
b := t.getBucket(bidx)
|
|
||||||
res = addInfos(res, b.peers)
|
|
||||||
res = addInfos(res, b.other)
|
|
||||||
}
|
|
||||||
doSort := func(infos []*dhtInfo) {
|
|
||||||
less := func(i, j int) bool {
|
|
||||||
return dht_firstCloserThanThird(infos[i].getNodeID(),
|
|
||||||
nodeID,
|
|
||||||
infos[j].getNodeID())
|
|
||||||
}
|
|
||||||
sort.SliceStable(infos, less)
|
|
||||||
}
|
|
||||||
doSort(res)
|
|
||||||
if len(res) > dht_lookup_size {
|
|
||||||
res = res[:dht_lookup_size]
|
|
||||||
}
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
|
|
||||||
// Gets the bucket for a specified matching prefix length.
|
|
||||||
func (t *dht) getBucket(bidx int) *bucket {
|
|
||||||
return &t.buckets_hidden[bidx]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Lists the number of buckets.
|
|
||||||
func (t *dht) nBuckets() int {
|
|
||||||
return len(t.buckets_hidden)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Inserts a node into the DHT if they meet certain requirements.
|
|
||||||
// In particular, they must either be a peer that's not already in the DHT, or else be someone we should insert into the DHT (see: shouldInsert).
|
|
||||||
func (t *dht) insertIfNew(info *dhtInfo, isPeer bool) {
|
|
||||||
// Insert if no "other" entry already exists
|
|
||||||
nodeID := info.getNodeID()
|
|
||||||
bidx, isOK := t.getBucketIndex(nodeID)
|
|
||||||
if !isOK {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
b := t.getBucket(bidx)
|
|
||||||
if (isPeer && !b.containsOther(info)) || t.shouldInsert(info) {
|
|
||||||
// We've never heard this node before
|
|
||||||
// TODO is there a better time than "now" to set send/recv to?
|
|
||||||
// (Is there another "natural" choice that bootstraps faster?)
|
|
||||||
info.send = time.Now()
|
|
||||||
info.recv = info.send
|
|
||||||
t.insert(info, isPeer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Adds a node to the DHT, possibly removing another node in the process.
|
|
||||||
func (t *dht) insert(info *dhtInfo, isPeer bool) {
|
|
||||||
// First update the time on this info
|
|
||||||
info.recv = time.Now()
|
|
||||||
// Get the bucket for this node
|
|
||||||
nodeID := info.getNodeID()
|
|
||||||
bidx, isOK := t.getBucketIndex(nodeID)
|
|
||||||
if !isOK {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
b := t.getBucket(bidx)
|
|
||||||
if !isPeer && !b.containsOther(info) {
|
|
||||||
// This is a new entry, give it an old age so it's pinged sooner
|
|
||||||
// This speeds up bootstrapping
|
|
||||||
info.recv = info.recv.Add(-time.Hour)
|
|
||||||
}
|
|
||||||
if isPeer || info.throttle > time.Minute {
|
|
||||||
info.throttle = time.Minute
|
|
||||||
}
|
|
||||||
// First drop any existing entry from the bucket
|
|
||||||
b.drop(&info.key)
|
|
||||||
// Now add to the *end* of the bucket
|
|
||||||
if isPeer {
|
|
||||||
// TODO make sure we don't duplicate peers in b.other too
|
|
||||||
b.peers = append(b.peers, info)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
b.other = append(b.other, info)
|
|
||||||
// Shrink from the *front* to requied size
|
|
||||||
for len(b.other) > dht_bucket_size {
|
|
||||||
b.other = b.other[1:]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Gets the bucket index for the bucket where we would put the given NodeID.
|
|
||||||
func (t *dht) getBucketIndex(nodeID *NodeID) (int, bool) {
|
|
||||||
for bidx := 0; bidx < t.nBuckets(); bidx++ {
|
|
||||||
them := nodeID[bidx/8] & (0x80 >> byte(bidx%8))
|
|
||||||
me := t.nodeID[bidx/8] & (0x80 >> byte(bidx%8))
|
|
||||||
if them != me {
|
|
||||||
return bidx, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return t.nBuckets(), false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper called by containsPeer, containsOther, and contains.
|
|
||||||
// Returns true if a node with the same ID *and coords* is already in the given part of the bucket.
|
|
||||||
func dht_bucket_check(newInfo *dhtInfo, infos []*dhtInfo) bool {
|
|
||||||
// Compares if key and coords match
|
|
||||||
if newInfo == nil {
|
|
||||||
panic("Should never happen")
|
|
||||||
}
|
|
||||||
for _, info := range infos {
|
|
||||||
if info == nil {
|
|
||||||
panic("Should never happen")
|
|
||||||
}
|
|
||||||
if info.key != newInfo.key {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if len(info.coords) != len(newInfo.coords) {
|
if t.isImportant(info) {
|
||||||
continue
|
t.ping(info, nil)
|
||||||
}
|
|
||||||
match := true
|
|
||||||
for idx := 0; idx < len(info.coords); idx++ {
|
|
||||||
if info.coords[idx] != newInfo.coords[idx] {
|
|
||||||
match = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if match {
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calls bucket_check over the bucket's peers infos.
|
|
||||||
func (b *bucket) containsPeer(info *dhtInfo) bool {
|
|
||||||
return dht_bucket_check(info, b.peers)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calls bucket_check over the bucket's other info.
|
|
||||||
func (b *bucket) containsOther(info *dhtInfo) bool {
|
|
||||||
return dht_bucket_check(info, b.other)
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns containsPeer || containsOther
|
|
||||||
func (b *bucket) contains(info *dhtInfo) bool {
|
|
||||||
return b.containsPeer(info) || b.containsOther(info)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Removes a node with the corresponding key, if any, from a bucket.
|
|
||||||
func (b *bucket) drop(key *boxPubKey) {
|
|
||||||
clean := func(infos []*dhtInfo) []*dhtInfo {
|
|
||||||
cleaned := infos[:0]
|
|
||||||
for _, info := range infos {
|
|
||||||
if info.key == *key {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
cleaned = append(cleaned, info)
|
|
||||||
}
|
|
||||||
return cleaned
|
|
||||||
}
|
|
||||||
b.peers = clean(b.peers)
|
|
||||||
b.other = clean(b.other)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sends a lookup request to the specified node.
|
// Sends a lookup request to the specified node.
|
||||||
@ -390,73 +249,9 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) {
|
|||||||
reqsToDest[req.Dest] = time.Now()
|
reqsToDest[req.Dest] = time.Now()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sends a lookup response to the specified node.
|
// Sends a lookup to this info, looking for the target.
|
||||||
func (t *dht) sendRes(res *dhtRes, req *dhtReq) {
|
|
||||||
// Send a reply for a dhtReq
|
|
||||||
bs := res.encode()
|
|
||||||
shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &req.Key)
|
|
||||||
payload, nonce := boxSeal(shared, bs, nil)
|
|
||||||
p := wire_protoTrafficPacket{
|
|
||||||
Coords: req.Coords,
|
|
||||||
ToKey: req.Key,
|
|
||||||
FromKey: t.core.boxPub,
|
|
||||||
Nonce: *nonce,
|
|
||||||
Payload: payload,
|
|
||||||
}
|
|
||||||
packet := p.encode()
|
|
||||||
t.core.router.out(packet)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns true of a bucket contains no peers and no other nodes.
|
|
||||||
func (b *bucket) isEmpty() bool {
|
|
||||||
return len(b.peers)+len(b.other) == 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// Gets the next node that should be pinged from the bucket.
|
|
||||||
// There's a cooldown of 6 seconds between ping attempts for each node, to give them time to respond.
|
|
||||||
// It returns the least recently pinged node, subject to that send cooldown.
|
|
||||||
func (b *bucket) nextToPing() *dhtInfo {
|
|
||||||
// Check the nodes in the bucket
|
|
||||||
// Return whichever one responded least recently
|
|
||||||
// Delay of 6 seconds between pinging the same node
|
|
||||||
// Gives them time to respond
|
|
||||||
// And time between traffic loss from short term congestion in the network
|
|
||||||
var toPing *dhtInfo
|
|
||||||
update := func(infos []*dhtInfo) {
|
|
||||||
for _, next := range infos {
|
|
||||||
if time.Since(next.send) < 6*time.Second {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if toPing == nil || next.recv.Before(toPing.recv) {
|
|
||||||
toPing = next
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
update(b.peers)
|
|
||||||
update(b.other)
|
|
||||||
return toPing
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns a useful target address to ask about for pings.
|
|
||||||
// Equal to the our node's ID, except for exactly 1 bit at the bucket index.
|
|
||||||
func (t *dht) getTarget(bidx int) *NodeID {
|
|
||||||
targetID := t.nodeID
|
|
||||||
targetID[bidx/8] ^= 0x80 >> byte(bidx%8)
|
|
||||||
return &targetID
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sends a ping to a node, or removes the node if it has failed to respond to too many pings.
|
|
||||||
// If target is nil, we will ask the node about our own NodeID.
|
|
||||||
func (t *dht) ping(info *dhtInfo, target *NodeID) {
|
func (t *dht) ping(info *dhtInfo, target *NodeID) {
|
||||||
if info.pings > 2 {
|
// Creates a req for the node at dhtInfo, asking them about the target (if one is given) or themself (if no target is given)
|
||||||
bidx, isOK := t.getBucketIndex(info.getNodeID())
|
|
||||||
if !isOK {
|
|
||||||
panic("This should never happen")
|
|
||||||
}
|
|
||||||
b := t.getBucket(bidx)
|
|
||||||
b.drop(&info.key)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if target == nil {
|
if target == nil {
|
||||||
target = &t.nodeID
|
target = &t.nodeID
|
||||||
}
|
}
|
||||||
@ -467,160 +262,103 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) {
|
|||||||
Coords: coords,
|
Coords: coords,
|
||||||
Dest: *target,
|
Dest: *target,
|
||||||
}
|
}
|
||||||
info.pings++
|
|
||||||
info.send = time.Now()
|
|
||||||
t.sendReq(&req, info)
|
t.sendReq(&req, info)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds a node info and target to the rumor mill.
|
// Periodic maintenance work to keep important DHT nodes alive.
|
||||||
// The node will be asked about the target at a later point, if doing so would still be useful at the time.
|
|
||||||
func (t *dht) addToMill(info *dhtInfo, target *NodeID) {
|
|
||||||
rumor := dht_rumor{
|
|
||||||
info: info,
|
|
||||||
target: target,
|
|
||||||
}
|
|
||||||
t.rumorMill = append(t.rumorMill, rumor)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Regular periodic maintenance.
|
|
||||||
// If the mill is empty, it adds two pings to the rumor mill.
|
|
||||||
// The first is to the node that responded least recently, provided that it's been at least 1 minute, to make sure we eventually detect and remove unresponsive nodes.
|
|
||||||
// The second is used for bootstrapping, and attempts to fill some bucket, iterating over buckets and resetting after it hits the last non-empty one.
|
|
||||||
// If the mill is not empty, it pops nodes from the mill until it finds one that would be useful to ping (see: shouldInsert), and then pings it.
|
|
||||||
func (t *dht) doMaintenance() {
|
func (t *dht) doMaintenance() {
|
||||||
// First clean up reqs
|
now := time.Now()
|
||||||
for key, reqs := range t.reqs {
|
for infoID, info := range t.table {
|
||||||
for target, timeout := range reqs {
|
if now.Sub(info.recv) > time.Minute || info.pings > 3 {
|
||||||
if time.Since(timeout) > time.Minute {
|
delete(t.table, infoID)
|
||||||
delete(reqs, target)
|
t.imp = nil
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(reqs) == 0 {
|
|
||||||
delete(t.reqs, key)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(t.rumorMill) == 0 {
|
for _, info := range t.getImportant() {
|
||||||
// Ping the least recently contacted node
|
if now.Sub(info.recv) > info.throttle {
|
||||||
// This is to make sure we eventually notice when someone times out
|
t.ping(info, info.getNodeID())
|
||||||
var oldest *dhtInfo
|
info.pings++
|
||||||
last := 0
|
info.throttle += time.Second
|
||||||
for bidx := 0; bidx < t.nBuckets(); bidx++ {
|
if info.throttle > 30*time.Second {
|
||||||
b := t.getBucket(bidx)
|
info.throttle = 30 * time.Second
|
||||||
if !b.isEmpty() {
|
|
||||||
last = bidx
|
|
||||||
toPing := b.nextToPing()
|
|
||||||
if toPing == nil {
|
|
||||||
continue
|
|
||||||
} // We've recently pinged everyone in b
|
|
||||||
if oldest == nil || toPing.recv.Before(oldest.recv) {
|
|
||||||
oldest = toPing
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if oldest != nil && time.Since(oldest.recv) > time.Minute {
|
|
||||||
// Ping the oldest node in the DHT, but don't ping nodes that have been checked within the last minute
|
|
||||||
t.addToMill(oldest, nil)
|
|
||||||
}
|
|
||||||
// Refresh buckets
|
|
||||||
if t.offset > last {
|
|
||||||
t.offset = 0
|
|
||||||
}
|
|
||||||
target := t.getTarget(t.offset)
|
|
||||||
func() {
|
|
||||||
closer := t.lookup(target, false)
|
|
||||||
for _, info := range closer {
|
|
||||||
// Throttled ping of a node that's closer to the destination
|
|
||||||
if time.Since(info.recv) > info.throttle {
|
|
||||||
t.addToMill(info, target)
|
|
||||||
t.offset++
|
|
||||||
info.bootstrapSend = time.Now()
|
|
||||||
info.throttle *= 2
|
|
||||||
if info.throttle > time.Minute {
|
|
||||||
info.throttle = time.Minute
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(closer) == 0 {
|
|
||||||
// If we don't know of anyone closer at all, then there's a hole in our dht
|
|
||||||
// Ping the closest node we know and ignore the throttle, to try to fill it
|
|
||||||
for _, info := range t.lookup(target, true) {
|
|
||||||
t.addToMill(info, target)
|
|
||||||
t.offset++
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
//t.offset++
|
|
||||||
}
|
|
||||||
for len(t.rumorMill) > 0 {
|
|
||||||
var rumor dht_rumor
|
|
||||||
rumor, t.rumorMill = t.rumorMill[0], t.rumorMill[1:]
|
|
||||||
if rumor.target == rumor.info.getNodeID() {
|
|
||||||
// Note that the above is a pointer comparison, and target can be nil
|
|
||||||
// This is only for adding new nodes (learned from other lookups)
|
|
||||||
// It only makes sense to ping if the node isn't already in the table
|
|
||||||
if !t.shouldInsert(rumor.info) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
t.ping(rumor.info, rumor.target)
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns true if it would be worth pinging the specified node.
|
// Gets a list of important nodes, used by isImportant.
|
||||||
// This requires that the bucket doesn't already contain the node, and that either the bucket isn't full yet or the node is closer to us in keyspace than some other node in that bucket.
|
func (t *dht) getImportant() []*dhtInfo {
|
||||||
func (t *dht) shouldInsert(info *dhtInfo) bool {
|
if t.imp == nil {
|
||||||
bidx, isOK := t.getBucketIndex(info.getNodeID())
|
// Get a list of all known nodes
|
||||||
if !isOK {
|
infos := make([]*dhtInfo, 0, len(t.table))
|
||||||
return false
|
for _, info := range t.table {
|
||||||
|
infos = append(infos, info)
|
||||||
|
}
|
||||||
|
// Sort them by increasing order in distance along the ring
|
||||||
|
sort.SliceStable(infos, func(i, j int) bool {
|
||||||
|
// Sort in order of predecessors (!), reverse from chord normal, because it plays nicer with zero bits for unknown parts of target addresses
|
||||||
|
return dht_ordered(infos[j].getNodeID(), infos[i].getNodeID(), &t.nodeID)
|
||||||
|
})
|
||||||
|
// Keep the ones that are no further than the closest seen so far
|
||||||
|
minDist := ^uint64(0)
|
||||||
|
loc := t.core.switchTable.getLocator()
|
||||||
|
important := infos[:0]
|
||||||
|
for _, info := range infos {
|
||||||
|
dist := uint64(loc.dist(info.coords))
|
||||||
|
if dist < minDist {
|
||||||
|
minDist = dist
|
||||||
|
important = append(important, info)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var temp []*dhtInfo
|
||||||
|
minDist = ^uint64(0)
|
||||||
|
for idx := len(infos) - 1; idx >= 0; idx-- {
|
||||||
|
info := infos[idx]
|
||||||
|
dist := uint64(loc.dist(info.coords))
|
||||||
|
if dist < minDist {
|
||||||
|
minDist = dist
|
||||||
|
temp = append(temp, info)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for idx := len(temp) - 1; idx >= 0; idx-- {
|
||||||
|
important = append(important, temp[idx])
|
||||||
|
}
|
||||||
|
t.imp = important
|
||||||
}
|
}
|
||||||
b := t.getBucket(bidx)
|
return t.imp
|
||||||
if b.containsOther(info) {
|
}
|
||||||
return false
|
|
||||||
}
|
// Returns true if this is a node we need to keep track of for the DHT to work.
|
||||||
if len(b.other) < dht_bucket_size {
|
func (t *dht) isImportant(ninfo *dhtInfo) bool {
|
||||||
return true
|
important := t.getImportant()
|
||||||
}
|
// Check if ninfo is of equal or greater importance to what we already know
|
||||||
for _, other := range b.other {
|
loc := t.core.switchTable.getLocator()
|
||||||
if dht_firstCloserThanThird(info.getNodeID(), &t.nodeID, other.getNodeID()) {
|
ndist := uint64(loc.dist(ninfo.coords))
|
||||||
|
minDist := ^uint64(0)
|
||||||
|
for _, info := range important {
|
||||||
|
if (*info.getNodeID() == *ninfo.getNodeID()) ||
|
||||||
|
(ndist < minDist && dht_ordered(info.getNodeID(), ninfo.getNodeID(), &t.nodeID)) {
|
||||||
|
// Either the same node, or a better one
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
dist := uint64(loc.dist(info.coords))
|
||||||
|
if dist < minDist {
|
||||||
|
minDist = dist
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
minDist = ^uint64(0)
|
||||||
|
for idx := len(important) - 1; idx >= 0; idx-- {
|
||||||
|
info := important[idx]
|
||||||
|
if (*info.getNodeID() == *ninfo.getNodeID()) ||
|
||||||
|
(ndist < minDist && dht_ordered(&t.nodeID, ninfo.getNodeID(), info.getNodeID())) {
|
||||||
|
// Either the same node, or a better one
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
dist := uint64(loc.dist(info.coords))
|
||||||
|
if dist < minDist {
|
||||||
|
minDist = dist
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// We didn't find any important node that ninfo is better than
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns true if the keyspace distance between the first and second node is smaller than the keyspace distance between the second and third node.
|
|
||||||
func dht_firstCloserThanThird(first *NodeID,
|
|
||||||
second *NodeID,
|
|
||||||
third *NodeID) bool {
|
|
||||||
for idx := 0; idx < NodeIDLen; idx++ {
|
|
||||||
f := first[idx] ^ second[idx]
|
|
||||||
t := third[idx] ^ second[idx]
|
|
||||||
if f == t {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return f < t
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Resets the DHT in response to coord changes.
|
|
||||||
// This empties all buckets, resets the bootstrapping cycle to 0, and empties the rumor mill.
|
|
||||||
// It adds all old "other" node info to the rumor mill, so they'll be pinged quickly.
|
|
||||||
// If those nodes haven't also changed coords, then this is a relatively quick way to notify those nodes of our new coords and re-add them to our own DHT if they respond.
|
|
||||||
func (t *dht) reset() {
|
|
||||||
// This is mostly so bootstrapping will reset to resend coords into the network
|
|
||||||
t.offset = 0
|
|
||||||
t.rumorMill = nil // reset mill
|
|
||||||
for _, b := range t.buckets_hidden {
|
|
||||||
b.peers = b.peers[:0]
|
|
||||||
for _, info := range b.other {
|
|
||||||
// Add other nodes to the rumor mill so they'll be pinged soon
|
|
||||||
// This will hopefully tell them our coords and re-learn theirs quickly if they haven't changed
|
|
||||||
t.addToMill(info, info.getNodeID())
|
|
||||||
}
|
|
||||||
b.other = b.other[:0]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -186,8 +186,11 @@ func (p *peer) linkLoop() {
|
|||||||
}
|
}
|
||||||
p.sendSwitchMsg()
|
p.sendSwitchMsg()
|
||||||
case _ = <-tick.C:
|
case _ = <-tick.C:
|
||||||
if p.dinfo != nil {
|
//break // FIXME disabled the below completely to test something
|
||||||
p.core.dht.peers <- p.dinfo
|
pdinfo := p.dinfo // FIXME this is a bad workarond NPE on the next line
|
||||||
|
if pdinfo != nil {
|
||||||
|
dinfo := *pdinfo
|
||||||
|
p.core.dht.peers <- &dinfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -333,7 +336,7 @@ func (p *peer) handleSwitchMsg(packet []byte) {
|
|||||||
key: p.box,
|
key: p.box,
|
||||||
coords: loc.getCoords(),
|
coords: loc.getCoords(),
|
||||||
}
|
}
|
||||||
p.core.dht.peers <- &dinfo
|
//p.core.dht.peers <- &dinfo
|
||||||
p.dinfo = &dinfo
|
p.dinfo = &dinfo
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,8 +96,12 @@ func (r *router) mainLoop() {
|
|||||||
case p := <-r.send:
|
case p := <-r.send:
|
||||||
r.sendPacket(p)
|
r.sendPacket(p)
|
||||||
case info := <-r.core.dht.peers:
|
case info := <-r.core.dht.peers:
|
||||||
r.core.dht.insertIfNew(info, false) // Insert as a normal node
|
now := time.Now()
|
||||||
r.core.dht.insertIfNew(info, true) // Insert as a peer
|
oldInfo, isIn := r.core.dht.table[*info.getNodeID()]
|
||||||
|
r.core.dht.insert(info)
|
||||||
|
if isIn && now.Sub(oldInfo.recv) < 45*time.Second {
|
||||||
|
info.recv = oldInfo.recv
|
||||||
|
}
|
||||||
case <-r.reset:
|
case <-r.reset:
|
||||||
r.core.sessions.resetInits()
|
r.core.sessions.resetInits()
|
||||||
r.core.dht.reset()
|
r.core.dht.reset()
|
||||||
|
@ -11,6 +11,9 @@ package yggdrasil
|
|||||||
// A new search packet is sent immediately after receiving a response
|
// A new search packet is sent immediately after receiving a response
|
||||||
// A new search packet is sent periodically, once per second, in case a packet was dropped (this slowly causes the search to become parallel if the search doesn't timeout but also doesn't finish within 1 second for whatever reason)
|
// A new search packet is sent periodically, once per second, in case a packet was dropped (this slowly causes the search to become parallel if the search doesn't timeout but also doesn't finish within 1 second for whatever reason)
|
||||||
|
|
||||||
|
// TODO?
|
||||||
|
// Some kind of max search steps, in case the node is offline, so we don't crawl through too much of the network looking for a destination that isn't there?
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
@ -88,11 +91,13 @@ func (s *searches) handleDHTRes(res *dhtRes) {
|
|||||||
func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) {
|
func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) {
|
||||||
// Add responses to toVisit if closer to dest than the res node
|
// Add responses to toVisit if closer to dest than the res node
|
||||||
from := dhtInfo{key: res.Key, coords: res.Coords}
|
from := dhtInfo{key: res.Key, coords: res.Coords}
|
||||||
|
sinfo.visited[*from.getNodeID()] = true
|
||||||
for _, info := range res.Infos {
|
for _, info := range res.Infos {
|
||||||
if sinfo.visited[*info.getNodeID()] {
|
if *info.getNodeID() == s.core.dht.nodeID || sinfo.visited[*info.getNodeID()] {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if dht_firstCloserThanThird(info.getNodeID(), &res.Dest, from.getNodeID()) {
|
if dht_ordered(&sinfo.dest, info.getNodeID(), from.getNodeID()) {
|
||||||
|
// Response is closer to the destination
|
||||||
sinfo.toVisit = append(sinfo.toVisit, info)
|
sinfo.toVisit = append(sinfo.toVisit, info)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -107,7 +112,8 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) {
|
|||||||
}
|
}
|
||||||
// Sort
|
// Sort
|
||||||
sort.SliceStable(sinfo.toVisit, func(i, j int) bool {
|
sort.SliceStable(sinfo.toVisit, func(i, j int) bool {
|
||||||
return dht_firstCloserThanThird(sinfo.toVisit[i].getNodeID(), &res.Dest, sinfo.toVisit[j].getNodeID())
|
// Should return true if i is closer to the destination than j
|
||||||
|
return dht_ordered(&res.Dest, sinfo.toVisit[i].getNodeID(), sinfo.toVisit[j].getNodeID())
|
||||||
})
|
})
|
||||||
// Truncate to some maximum size
|
// Truncate to some maximum size
|
||||||
if len(sinfo.toVisit) > search_MAX_SEARCH_SIZE {
|
if len(sinfo.toVisit) > search_MAX_SEARCH_SIZE {
|
||||||
@ -126,11 +132,7 @@ func (s *searches) doSearchStep(sinfo *searchInfo) {
|
|||||||
// Send to the next search target
|
// Send to the next search target
|
||||||
var next *dhtInfo
|
var next *dhtInfo
|
||||||
next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:]
|
next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:]
|
||||||
var oldPings int
|
|
||||||
oldPings, next.pings = next.pings, 0
|
|
||||||
s.core.dht.ping(next, &sinfo.dest)
|
s.core.dht.ping(next, &sinfo.dest)
|
||||||
next.pings = oldPings // Don't evict a node for searching with it too much
|
|
||||||
sinfo.visited[*next.getNodeID()] = true
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -358,7 +358,7 @@ func (ss *sessions) getSharedKey(myPriv *boxPrivKey,
|
|||||||
return skey
|
return skey
|
||||||
}
|
}
|
||||||
// First do some cleanup
|
// First do some cleanup
|
||||||
const maxKeys = dht_bucket_number * dht_bucket_size
|
const maxKeys = 1024
|
||||||
for key := range ss.permShared {
|
for key := range ss.permShared {
|
||||||
// Remove a random key until the store is small enough
|
// Remove a random key until the store is small enough
|
||||||
if len(ss.permShared) < maxKeys {
|
if len(ss.permShared) < maxKeys {
|
||||||
|
@ -158,9 +158,9 @@ type switchTable struct {
|
|||||||
core *Core
|
core *Core
|
||||||
key sigPubKey // Our own key
|
key sigPubKey // Our own key
|
||||||
time time.Time // Time when locator.tstamp was last updated
|
time time.Time // Time when locator.tstamp was last updated
|
||||||
parent switchPort // Port of whatever peer is our parent, or self if we're root
|
|
||||||
drop map[sigPubKey]int64 // Tstamp associated with a dropped root
|
drop map[sigPubKey]int64 // Tstamp associated with a dropped root
|
||||||
mutex sync.RWMutex // Lock for reads/writes of switchData
|
mutex sync.RWMutex // Lock for reads/writes of switchData
|
||||||
|
parent switchPort // Port of whatever peer is our parent, or self if we're root
|
||||||
data switchData //
|
data switchData //
|
||||||
updater atomic.Value // *sync.Once
|
updater atomic.Value // *sync.Once
|
||||||
table atomic.Value // lookupTable
|
table atomic.Value // lookupTable
|
||||||
|
Loading…
Reference in New Issue
Block a user