5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-25 23:01:38 +00:00
yggdrasil-go/src/yggdrasil/dht.go

496 lines
12 KiB
Go

package yggdrasil
/*
This part has the (kademlia-like) distributed hash table
It's used to look up coords for a NodeID
Every node participates in the DHT, and the DHT stores no real keys/values
(Only the peer relationships / lookups are needed)
This version is intentionally fragile, by being recursive instead of iterative
(it's also not parallel, as a result)
This is to make sure that DHT black holes are visible if they exist
(the iterative parallel approach tends to get around them sometimes)
I haven't seen this get stuck on blackholes, but I also haven't proven it can't
Slight changes *do* make it blackhole hard, bootstrapping isn't an easy problem
*/
import "sort"
import "time"
//import "fmt"
// Maximum size for buckets and lookups
// Exception for buckets if the next one is non-full
const dht_bucket_size = 2 // This should be at least 2
const dht_lookup_size = 2 // This should be at least 1, below 2 is impractical
const dht_bucket_number = 8 * NodeIDLen // This shouldn't be changed
type dhtInfo struct {
nodeID_hidden *NodeID
key boxPubKey
coords []byte
send time.Time // When we last sent a message
recv time.Time // When we last received a message
pings int // Decide when to drop
}
func (info *dhtInfo) getNodeID() *NodeID {
if info.nodeID_hidden == nil {
info.nodeID_hidden = getNodeID(&info.key)
}
return info.nodeID_hidden
}
type bucket struct {
peers []*dhtInfo
other []*dhtInfo
}
type dhtReq struct {
key boxPubKey // Key of whoever asked
coords []byte // Coords of whoever asked
dest NodeID // NodeID they're asking about
}
type dhtRes struct {
key boxPubKey // key to respond to
coords []byte // coords to respond to
dest NodeID
infos []*dhtInfo // response
}
type dht_rumor struct {
info *dhtInfo
target *NodeID
}
type dht struct {
core *Core
nodeID NodeID
buckets_hidden [dht_bucket_number]bucket // Extra is for the self-bucket
peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[boxPubKey]map[NodeID]time.Time
offset int
rumorMill []dht_rumor
}
func (t *dht) init(c *Core) {
t.core = c
t.nodeID = *t.core.GetNodeID()
t.peers = make(chan *dhtInfo, 1)
t.reqs = make(map[boxPubKey]map[NodeID]time.Time)
}
func (t *dht) handleReq(req *dhtReq) {
// Send them what they asked for
loc := t.core.switchTable.getLocator()
coords := loc.getCoords()
res := dhtRes{
key: t.core.boxPub,
coords: coords,
dest: req.dest,
infos: t.lookup(&req.dest),
}
t.sendRes(&res, req)
// Also (possibly) add them to our DHT
info := dhtInfo{
key: req.key,
coords: req.coords,
}
t.insertIfNew(&info, false) // This seems DoSable (we just trust their coords...)
//if req.dest != t.nodeID { t.ping(&info, info.getNodeID()) } // Or spam...
}
func (t *dht) handleRes(res *dhtRes) {
reqs, isIn := t.reqs[res.key]
if !isIn {
return
}
_, isIn = reqs[res.dest]
if !isIn {
return
}
rinfo := dhtInfo{
key: res.key,
coords: res.coords,
send: time.Now(), // Technically wrong but should be OK...
recv: time.Now(),
}
// If they're already in the table, then keep the correct send time
bidx, isOK := t.getBucketIndex(rinfo.getNodeID())
if !isOK {
return
}
b := t.getBucket(bidx)
for _, oldinfo := range b.peers {
if oldinfo.key == rinfo.key {
rinfo.send = oldinfo.send
}
}
for _, oldinfo := range b.other {
if oldinfo.key == rinfo.key {
rinfo.send = oldinfo.send
}
}
// Insert into table
t.insert(&rinfo, false)
if res.dest == *rinfo.getNodeID() {
return
} // No infinite recursions
// ping the nodes we were told about
if len(res.infos) > dht_lookup_size {
// Ignore any "extra" lookup results
res.infos = res.infos[:dht_lookup_size]
}
for _, info := range res.infos {
bidx, isOK := t.getBucketIndex(info.getNodeID())
if !isOK {
continue
}
b := t.getBucket(bidx)
if b.contains(info) {
continue
} // wait for maintenance cycle to get them
t.addToMill(info, info.getNodeID())
}
}
func (t *dht) lookup(nodeID *NodeID) []*dhtInfo {
// FIXME this allocates a bunch, sorts, and keeps the part it likes
// It would be better to only track the part it likes to begin with
addInfos := func(res []*dhtInfo, infos []*dhtInfo) []*dhtInfo {
for _, info := range infos {
if info == nil {
panic("Should never happen!")
}
if true || dht_firstCloserThanThird(info.getNodeID(), nodeID, &t.nodeID) {
res = append(res, info)
}
}
return res
}
var res []*dhtInfo
for bidx := 0; bidx < t.nBuckets(); bidx++ {
b := t.getBucket(bidx)
res = addInfos(res, b.peers)
res = addInfos(res, b.other)
}
doSort := func(infos []*dhtInfo) {
less := func(i, j int) bool {
return dht_firstCloserThanThird(infos[i].getNodeID(),
nodeID,
infos[j].getNodeID())
}
sort.SliceStable(infos, less)
}
doSort(res)
if len(res) > dht_lookup_size {
res = res[:dht_lookup_size]
}
return res
}
func (t *dht) getBucket(bidx int) *bucket {
return &t.buckets_hidden[bidx]
}
func (t *dht) nBuckets() int {
return len(t.buckets_hidden)
}
func (t *dht) insertIfNew(info *dhtInfo, isPeer bool) {
//fmt.Println("DEBUG: dht insertIfNew:", info.getNodeID(), info.coords)
// Insert a peer if and only if the bucket doesn't already contain it
nodeID := info.getNodeID()
bidx, isOK := t.getBucketIndex(nodeID)
if !isOK {
return
}
b := t.getBucket(bidx)
if !b.contains(info) {
// We've never heard this node before
// TODO is there a better time than "now" to set send/recv to?
// (Is there another "natural" choice that bootstraps faster?)
info.send = time.Now()
info.recv = info.send
t.insert(info, isPeer)
}
}
func (t *dht) insert(info *dhtInfo, isPeer bool) {
//fmt.Println("DEBUG: dht insert:", info.getNodeID(), info.coords)
// First update the time on this info
info.recv = time.Now()
// Get the bucket for this node
nodeID := info.getNodeID()
bidx, isOK := t.getBucketIndex(nodeID)
if !isOK {
return
}
b := t.getBucket(bidx)
// First drop any existing entry from the bucket
b.drop(&info.key)
// Now add to the *end* of the bucket
if isPeer {
// TODO make sure we don't duplicate peers in b.other too
b.peers = append(b.peers, info)
return
}
b.other = append(b.other, info)
// Check if the next bucket is non-full and return early if it is
if bidx+1 == t.nBuckets() {
return
}
bnext := t.getBucket(bidx + 1)
if len(bnext.other) < dht_bucket_size {
return
}
// Shrink from the *front* to requied size
for len(b.other) > dht_bucket_size {
b.other = b.other[1:]
}
}
func (t *dht) getBucketIndex(nodeID *NodeID) (int, bool) {
for bidx := 0; bidx < t.nBuckets(); bidx++ {
them := nodeID[bidx/8] & (0x80 >> byte(bidx%8))
me := t.nodeID[bidx/8] & (0x80 >> byte(bidx%8))
if them != me {
return bidx, true
}
}
return t.nBuckets(), false
}
func (b *bucket) contains(ninfo *dhtInfo) bool {
// Compares if key and coords match
var found bool
check := func(infos []*dhtInfo) {
for _, info := range infos {
if info == nil {
panic("Should never happen")
}
if info.key != info.key {
continue
}
if len(info.coords) != len(ninfo.coords) {
continue
}
match := true
for idx := 0; idx < len(info.coords); idx++ {
if info.coords[idx] != ninfo.coords[idx] {
match = false
break
}
}
if match {
found = true
break
}
}
}
check(b.peers)
check(b.other)
return found
}
func (b *bucket) drop(key *boxPubKey) {
clean := func(infos []*dhtInfo) []*dhtInfo {
cleaned := infos[:0]
for _, info := range infos {
if info.key == *key {
continue
}
cleaned = append(cleaned, info)
}
return cleaned
}
b.peers = clean(b.peers)
b.other = clean(b.other)
}
func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) {
// Send a dhtReq to the node in dhtInfo
bs := req.encode()
shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &dest.key)
payload, nonce := boxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
ttl: ^uint64(0),
coords: dest.coords,
toKey: dest.key,
fromKey: t.core.boxPub,
nonce: *nonce,
payload: payload,
}
packet := p.encode()
t.core.router.out(packet)
reqsToDest, isIn := t.reqs[dest.key]
if !isIn {
t.reqs[dest.key] = make(map[NodeID]time.Time)
reqsToDest, isIn = t.reqs[dest.key]
if !isIn {
panic("This should never happen")
}
}
reqsToDest[req.dest] = time.Now()
}
func (t *dht) sendRes(res *dhtRes, req *dhtReq) {
// Send a reply for a dhtReq
bs := res.encode()
shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &req.key)
payload, nonce := boxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
ttl: ^uint64(0),
coords: req.coords,
toKey: req.key,
fromKey: t.core.boxPub,
nonce: *nonce,
payload: payload,
}
packet := p.encode()
t.core.router.out(packet)
}
func (b *bucket) isEmpty() bool {
return len(b.peers)+len(b.other) == 0
}
func (b *bucket) nextToPing() *dhtInfo {
// Check the nodes in the bucket
// Return whichever one responded least recently
// Delay of 6 seconds between pinging the same node
// Gives them time to respond
// And time between traffic loss from short term congestion in the network
var toPing *dhtInfo
update := func(infos []*dhtInfo) {
for _, next := range infos {
if time.Since(next.send) < 6*time.Second {
continue
}
if toPing == nil || next.recv.Before(toPing.recv) {
toPing = next
}
}
}
update(b.peers)
update(b.other)
return toPing
}
func (t *dht) getTarget(bidx int) *NodeID {
targetID := t.nodeID
targetID[bidx/8] ^= 0x80 >> byte(bidx%8)
return &targetID
}
func (t *dht) ping(info *dhtInfo, target *NodeID) {
if info.pings > 2 {
bidx, isOK := t.getBucketIndex(info.getNodeID())
if !isOK {
panic("This should never happen")
}
b := t.getBucket(bidx)
b.drop(&info.key)
return
}
if target == nil {
target = &t.nodeID
}
loc := t.core.switchTable.getLocator()
coords := loc.getCoords()
req := dhtReq{
key: t.core.boxPub,
coords: coords,
dest: *target,
}
info.pings++
info.send = time.Now()
t.sendReq(&req, info)
}
func (t *dht) addToMill(info *dhtInfo, target *NodeID) {
rumor := dht_rumor{
info: info,
target: target,
}
t.rumorMill = append(t.rumorMill, rumor)
}
func (t *dht) doMaintenance() {
// First clean up reqs
for key, reqs := range t.reqs {
for target, timeout := range reqs {
if time.Since(timeout) > time.Minute {
delete(reqs, target)
}
}
if len(reqs) == 0 {
delete(t.reqs, key)
}
}
if len(t.rumorMill) == 0 {
// Ping the least recently contacted node
// This is to make sure we eventually notice when someone times out
var oldest *dhtInfo
last := 0
for bidx := 0; bidx < t.nBuckets(); bidx++ {
b := t.getBucket(bidx)
if !b.isEmpty() {
last = bidx
toPing := b.nextToPing()
if toPing == nil {
continue
} // We've recently pinged everyone in b
if oldest == nil || toPing.recv.Before(oldest.recv) {
oldest = toPing
}
}
}
if oldest != nil && time.Since(oldest.recv) > time.Minute {
t.addToMill(oldest, nil)
} // if the DHT isn't empty
// Refresh buckets
if t.offset > last {
t.offset = 0
}
target := t.getTarget(t.offset)
for _, info := range t.lookup(target) {
if time.Since(info.recv) > time.Minute {
t.addToMill(info, target)
break
}
}
t.offset++
}
if len(t.rumorMill) > 0 {
var rumor dht_rumor
rumor, t.rumorMill = t.rumorMill[0], t.rumorMill[1:]
t.ping(rumor.info, rumor.target)
}
}
func dht_firstCloserThanThird(first *NodeID,
second *NodeID,
third *NodeID) bool {
for idx := 0; idx < NodeIDLen; idx++ {
f := first[idx] ^ second[idx]
t := third[idx] ^ second[idx]
if f == t {
continue
}
return f < t
}
return false
}
func (t *dht) resetPeers() {
for _, b := range t.buckets_hidden {
b.peers = b.peers[:0]
}
}