5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-14 03:20:28 +00:00

refactor dht code to call arbitrary callbacks instead of only searches.checkDHTRes, and add admin API fuction to dhtPing a node (with an optional target NodeID)

This commit is contained in:
Arceliar 2018-11-25 16:10:32 -06:00
parent 9f16d0ed1f
commit d520a8a1d5
3 changed files with 112 additions and 31 deletions

View File

@ -302,6 +302,24 @@ func (a *admin) init(c *Core, listenaddr string) {
return admin_info{"not_removed": []string{fmt.Sprintf("%s via %s", in["subnet"].(string), in["destPubKey"].(string))}}, errors.New("Failed to remove route") return admin_info{"not_removed": []string{fmt.Sprintf("%s via %s", in["subnet"].(string), in["destPubKey"].(string))}}, errors.New("Failed to remove route")
} }
}) })
a.addHandler("dhtPing", []string{"key", "coords", "[target]"}, func(in admin_info) (admin_info, error) {
if in["target"] == nil {
in["target"] = "none"
}
result, err := a.admin_dhtPing(in["key"].(string), in["coords"].(string), in["target"].(string))
if err == nil {
var infos []map[string]string
for _, dinfo := range result.Infos {
info := make(map[string]string)
info["key"] = hex.EncodeToString(dinfo.key[:])
info["coords"] = fmt.Sprintf("%v", dinfo.coords)
infos = append(infos, info)
}
return admin_info{"nodes": infos}, nil
} else {
return admin_info{}, err
}
})
} }
// start runs the admin API socket to listen for / respond to admin API calls. // start runs the admin API socket to listen for / respond to admin API calls.
@ -536,6 +554,7 @@ func (a *admin) getData_getSelf() *admin_nodeInfo {
table := a.core.switchTable.table.Load().(lookupTable) table := a.core.switchTable.table.Load().(lookupTable)
coords := table.self.getCoords() coords := table.self.getCoords()
self := admin_nodeInfo{ self := admin_nodeInfo{
{"key", hex.EncodeToString(a.core.boxPub[:])},
{"ip", a.core.GetAddress().String()}, {"ip", a.core.GetAddress().String()},
{"subnet", a.core.GetSubnet().String()}, {"subnet", a.core.GetSubnet().String()},
{"coords", fmt.Sprint(coords)}, {"coords", fmt.Sprint(coords)},
@ -702,6 +721,60 @@ func (a *admin) removeAllowedEncryptionPublicKey(bstr string) (err error) {
return return
} }
// Send a DHT ping to the node with the provided key and coords, optionally looking up the specified target NodeID.
func (a *admin) admin_dhtPing(keyString, coordString, targetString string) (dhtRes, error) {
var key boxPubKey
if keyBytes, err := hex.DecodeString(keyString); err != nil {
return dhtRes{}, err
} else {
copy(key[:], keyBytes)
}
var coords []byte
for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") {
if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil {
return dhtRes{}, err
} else {
coords = append(coords, uint8(u64))
}
}
resCh := make(chan *dhtRes)
info := dhtInfo{
key: key,
coords: coords,
}
target := *info.getNodeID()
if targetString == "none" {
// Leave the default target in place
} else if targetBytes, err := hex.DecodeString(targetString); err != nil {
return dhtRes{}, err
} else if len(targetBytes) != len(target) {
return dhtRes{}, errors.New("Incorrect target NodeID length")
} else {
target = NodeID{}
copy(target[:], targetBytes)
}
rq := dhtReqKey{info.key, target}
sendPing := func() {
a.core.dht.addCallback(&rq, func(res *dhtRes) {
defer func() { recover() }()
select {
case resCh <- res:
default:
}
})
a.core.dht.ping(&info, &target)
}
a.core.router.doAdmin(sendPing)
go func() {
time.Sleep(6 * time.Second)
close(resCh)
}()
for res := range resCh {
return *res, nil
}
return dhtRes{}, errors.New(fmt.Sprintf("DHT ping timeout: %s", keyString))
}
// getResponse_dot returns a response for a graphviz dot formatted representation of the known parts of the network. // getResponse_dot returns a response for a graphviz dot formatted representation of the known parts of the network.
// This is color-coded and labeled, and includes the self node, switch peers, nodes known to the DHT, and nodes with open sessions. // This is color-coded and labeled, and includes the self node, switch peers, nodes known to the DHT, and nodes with open sessions.
// The graph is structured as a tree with directed links leading away from the root. // The graph is structured as a tree with directed links leading away from the root.

View File

@ -49,12 +49,19 @@ type dhtRes struct {
Infos []*dhtInfo // response Infos []*dhtInfo // response
} }
// Parts of a DHT req usable as a key in a map.
type dhtReqKey struct {
key boxPubKey
dest NodeID
}
// The main DHT struct. // The main DHT struct.
type dht struct { type dht struct {
core *Core core *Core
nodeID NodeID nodeID NodeID
peers chan *dhtInfo // other goroutines put incoming dht updates here peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[boxPubKey]map[NodeID]time.Time reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests
callbacks map[dhtReqKey][]func(*dhtRes) // Search and admin lookup callbacks
// These next two could be replaced by a single linked list or similar... // These next two could be replaced by a single linked list or similar...
table map[NodeID]*dhtInfo table map[NodeID]*dhtInfo
imp []*dhtInfo imp []*dhtInfo
@ -65,13 +72,14 @@ func (t *dht) init(c *Core) {
t.core = c t.core = c
t.nodeID = *t.core.GetNodeID() t.nodeID = *t.core.GetNodeID()
t.peers = make(chan *dhtInfo, 1024) t.peers = make(chan *dhtInfo, 1024)
t.callbacks = make(map[dhtReqKey][]func(*dhtRes))
t.reset() t.reset()
} }
// Resets the DHT in response to coord changes. // Resets the DHT in response to coord changes.
// This empties all info from the DHT and drops outstanding requests. // This empties all info from the DHT and drops outstanding requests.
func (t *dht) reset() { func (t *dht) reset() {
t.reqs = make(map[boxPubKey]map[NodeID]time.Time) t.reqs = make(map[dhtReqKey]time.Time)
t.table = make(map[NodeID]*dhtInfo) t.table = make(map[NodeID]*dhtInfo)
t.imp = nil t.imp = nil
} }
@ -194,19 +202,31 @@ func (t *dht) sendRes(res *dhtRes, req *dhtReq) {
t.core.router.out(packet) t.core.router.out(packet)
} }
// Adds a callback and removes it after some timeout.
func (t *dht) addCallback(rq *dhtReqKey, callback func(*dhtRes)) {
t.callbacks[*rq] = append(t.callbacks[*rq], callback)
go func() {
time.Sleep(6 * time.Second)
t.core.router.admin <- func() {
delete(t.callbacks, *rq)
}
}()
}
// Reads a lookup response, checks that we had sent a matching request, and processes the response info. // Reads a lookup response, checks that we had sent a matching request, and processes the response info.
// This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses // This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses
func (t *dht) handleRes(res *dhtRes) { func (t *dht) handleRes(res *dhtRes) {
rq := dhtReqKey{res.Key, res.Dest}
for _, callback := range t.callbacks[rq] {
callback(res)
}
delete(t.callbacks, rq)
t.core.searches.handleDHTRes(res) t.core.searches.handleDHTRes(res)
reqs, isIn := t.reqs[res.Key] _, isIn := t.reqs[rq]
if !isIn { if !isIn {
return return
} }
_, isIn = reqs[res.Dest] delete(t.reqs, rq)
if !isIn {
return
}
delete(reqs, res.Dest)
rinfo := dhtInfo{ rinfo := dhtInfo{
key: res.Key, key: res.Key,
coords: res.Coords, coords: res.Coords,
@ -243,15 +263,8 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) {
} }
packet := p.encode() packet := p.encode()
t.core.router.out(packet) t.core.router.out(packet)
reqsToDest, isIn := t.reqs[dest.key] rq := dhtReqKey{req.Key, req.Dest}
if !isIn { t.reqs[rq] = time.Now()
t.reqs[dest.key] = make(map[NodeID]time.Time)
reqsToDest, isIn = t.reqs[dest.key]
if !isIn {
panic("This should never happen")
}
}
reqsToDest[req.Dest] = time.Now()
} }
// Sends a lookup to this info, looking for the target. // Sends a lookup to this info, looking for the target.
@ -273,17 +286,10 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) {
// Periodic maintenance work to keep important DHT nodes alive. // Periodic maintenance work to keep important DHT nodes alive.
func (t *dht) doMaintenance() { func (t *dht) doMaintenance() {
now := time.Now() now := time.Now()
newReqs := make(map[boxPubKey]map[NodeID]time.Time, len(t.reqs)) newReqs := make(map[dhtReqKey]time.Time, len(t.reqs))
for key, dests := range t.reqs { for key, start := range t.reqs {
newDests := make(map[NodeID]time.Time, len(dests)) if now.Sub(start) < 6*time.Second {
for nodeID, start := range dests { newReqs[key] = start
if now.Sub(start) > 6*time.Second {
continue
}
newDests[nodeID] = start
}
if len(newDests) > 0 {
newReqs[key] = newDests
} }
} }
t.reqs = newReqs t.reqs = newReqs

View File

@ -132,6 +132,8 @@ func (s *searches) doSearchStep(sinfo *searchInfo) {
// Send to the next search target // Send to the next search target
var next *dhtInfo var next *dhtInfo
next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:]
rq := dhtReqKey{next.key, sinfo.dest}
s.core.dht.addCallback(&rq, s.handleDHTRes)
s.core.dht.ping(next, &sinfo.dest) s.core.dht.ping(next, &sinfo.dest)
} }
} }