mirror of
https://github.com/cwinfo/yggdrasil-go.git
synced 2024-11-22 09:30:28 +00:00
Merge pull request #212 from Arceliar/admin
Add dhtPing to the admin interface
This commit is contained in:
commit
bd9055ddd7
@ -302,6 +302,26 @@ func (a *admin) init(c *Core, listenaddr string) {
|
||||
return admin_info{"not_removed": []string{fmt.Sprintf("%s via %s", in["subnet"].(string), in["destPubKey"].(string))}}, errors.New("Failed to remove route")
|
||||
}
|
||||
})
|
||||
a.addHandler("dhtPing", []string{"key", "coords", "[target]"}, func(in admin_info) (admin_info, error) {
|
||||
if in["target"] == nil {
|
||||
in["target"] = "none"
|
||||
}
|
||||
result, err := a.admin_dhtPing(in["key"].(string), in["coords"].(string), in["target"].(string))
|
||||
if err == nil {
|
||||
infos := make(map[string]map[string]string, len(result.Infos))
|
||||
for _, dinfo := range result.Infos {
|
||||
info := map[string]string{
|
||||
"key": hex.EncodeToString(dinfo.key[:]),
|
||||
"coords": fmt.Sprintf("%v", dinfo.coords),
|
||||
}
|
||||
addr := net.IP(address_addrForNodeID(getNodeID(&dinfo.key))[:]).String()
|
||||
infos[addr] = info
|
||||
}
|
||||
return admin_info{"nodes": infos}, nil
|
||||
} else {
|
||||
return admin_info{}, err
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// start runs the admin API socket to listen for / respond to admin API calls.
|
||||
@ -536,6 +556,7 @@ func (a *admin) getData_getSelf() *admin_nodeInfo {
|
||||
table := a.core.switchTable.table.Load().(lookupTable)
|
||||
coords := table.self.getCoords()
|
||||
self := admin_nodeInfo{
|
||||
{"key", hex.EncodeToString(a.core.boxPub[:])},
|
||||
{"ip", a.core.GetAddress().String()},
|
||||
{"subnet", a.core.GetSubnet().String()},
|
||||
{"coords", fmt.Sprint(coords)},
|
||||
@ -702,6 +723,60 @@ func (a *admin) removeAllowedEncryptionPublicKey(bstr string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Send a DHT ping to the node with the provided key and coords, optionally looking up the specified target NodeID.
|
||||
func (a *admin) admin_dhtPing(keyString, coordString, targetString string) (dhtRes, error) {
|
||||
var key boxPubKey
|
||||
if keyBytes, err := hex.DecodeString(keyString); err != nil {
|
||||
return dhtRes{}, err
|
||||
} else {
|
||||
copy(key[:], keyBytes)
|
||||
}
|
||||
var coords []byte
|
||||
for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") {
|
||||
if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil {
|
||||
return dhtRes{}, err
|
||||
} else {
|
||||
coords = append(coords, uint8(u64))
|
||||
}
|
||||
}
|
||||
resCh := make(chan *dhtRes, 1)
|
||||
info := dhtInfo{
|
||||
key: key,
|
||||
coords: coords,
|
||||
}
|
||||
target := *info.getNodeID()
|
||||
if targetString == "none" {
|
||||
// Leave the default target in place
|
||||
} else if targetBytes, err := hex.DecodeString(targetString); err != nil {
|
||||
return dhtRes{}, err
|
||||
} else if len(targetBytes) != len(target) {
|
||||
return dhtRes{}, errors.New("Incorrect target NodeID length")
|
||||
} else {
|
||||
target = NodeID{}
|
||||
copy(target[:], targetBytes)
|
||||
}
|
||||
rq := dhtReqKey{info.key, target}
|
||||
sendPing := func() {
|
||||
a.core.dht.addCallback(&rq, func(res *dhtRes) {
|
||||
defer func() { recover() }()
|
||||
select {
|
||||
case resCh <- res:
|
||||
default:
|
||||
}
|
||||
})
|
||||
a.core.dht.ping(&info, &target)
|
||||
}
|
||||
a.core.router.doAdmin(sendPing)
|
||||
go func() {
|
||||
time.Sleep(6 * time.Second)
|
||||
close(resCh)
|
||||
}()
|
||||
for res := range resCh {
|
||||
return *res, nil
|
||||
}
|
||||
return dhtRes{}, errors.New(fmt.Sprintf("DHT ping timeout: %s", keyString))
|
||||
}
|
||||
|
||||
// getResponse_dot returns a response for a graphviz dot formatted representation of the known parts of the network.
|
||||
// This is color-coded and labeled, and includes the self node, switch peers, nodes known to the DHT, and nodes with open sessions.
|
||||
// The graph is structured as a tree with directed links leading away from the root.
|
||||
|
@ -49,12 +49,19 @@ type dhtRes struct {
|
||||
Infos []*dhtInfo // response
|
||||
}
|
||||
|
||||
// Parts of a DHT req usable as a key in a map.
|
||||
type dhtReqKey struct {
|
||||
key boxPubKey
|
||||
dest NodeID
|
||||
}
|
||||
|
||||
// The main DHT struct.
|
||||
type dht struct {
|
||||
core *Core
|
||||
nodeID NodeID
|
||||
peers chan *dhtInfo // other goroutines put incoming dht updates here
|
||||
reqs map[boxPubKey]map[NodeID]time.Time
|
||||
core *Core
|
||||
nodeID NodeID
|
||||
peers chan *dhtInfo // other goroutines put incoming dht updates here
|
||||
reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests
|
||||
callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks
|
||||
// These next two could be replaced by a single linked list or similar...
|
||||
table map[NodeID]*dhtInfo
|
||||
imp []*dhtInfo
|
||||
@ -65,13 +72,14 @@ func (t *dht) init(c *Core) {
|
||||
t.core = c
|
||||
t.nodeID = *t.core.GetNodeID()
|
||||
t.peers = make(chan *dhtInfo, 1024)
|
||||
t.callbacks = make(map[dhtReqKey]dht_callbackInfo)
|
||||
t.reset()
|
||||
}
|
||||
|
||||
// Resets the DHT in response to coord changes.
|
||||
// This empties all info from the DHT and drops outstanding requests.
|
||||
func (t *dht) reset() {
|
||||
t.reqs = make(map[boxPubKey]map[NodeID]time.Time)
|
||||
t.reqs = make(map[dhtReqKey]time.Time)
|
||||
t.table = make(map[NodeID]*dhtInfo)
|
||||
t.imp = nil
|
||||
}
|
||||
@ -194,19 +202,30 @@ func (t *dht) sendRes(res *dhtRes, req *dhtReq) {
|
||||
t.core.router.out(packet)
|
||||
}
|
||||
|
||||
type dht_callbackInfo struct {
|
||||
f func(*dhtRes)
|
||||
time time.Time
|
||||
}
|
||||
|
||||
// Adds a callback and removes it after some timeout.
|
||||
func (t *dht) addCallback(rq *dhtReqKey, callback func(*dhtRes)) {
|
||||
info := dht_callbackInfo{callback, time.Now().Add(6 * time.Second)}
|
||||
t.callbacks[*rq] = info
|
||||
}
|
||||
|
||||
// Reads a lookup response, checks that we had sent a matching request, and processes the response info.
|
||||
// This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses
|
||||
func (t *dht) handleRes(res *dhtRes) {
|
||||
t.core.searches.handleDHTRes(res)
|
||||
reqs, isIn := t.reqs[res.Key]
|
||||
rq := dhtReqKey{res.Key, res.Dest}
|
||||
if callback, isIn := t.callbacks[rq]; isIn {
|
||||
callback.f(res)
|
||||
delete(t.callbacks, rq)
|
||||
}
|
||||
_, isIn := t.reqs[rq]
|
||||
if !isIn {
|
||||
return
|
||||
}
|
||||
_, isIn = reqs[res.Dest]
|
||||
if !isIn {
|
||||
return
|
||||
}
|
||||
delete(reqs, res.Dest)
|
||||
delete(t.reqs, rq)
|
||||
rinfo := dhtInfo{
|
||||
key: res.Key,
|
||||
coords: res.Coords,
|
||||
@ -243,15 +262,8 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) {
|
||||
}
|
||||
packet := p.encode()
|
||||
t.core.router.out(packet)
|
||||
reqsToDest, isIn := t.reqs[dest.key]
|
||||
if !isIn {
|
||||
t.reqs[dest.key] = make(map[NodeID]time.Time)
|
||||
reqsToDest, isIn = t.reqs[dest.key]
|
||||
if !isIn {
|
||||
panic("This should never happen")
|
||||
}
|
||||
}
|
||||
reqsToDest[req.Dest] = time.Now()
|
||||
rq := dhtReqKey{dest.key, req.Dest}
|
||||
t.reqs[rq] = time.Now()
|
||||
}
|
||||
|
||||
// Sends a lookup to this info, looking for the target.
|
||||
@ -273,20 +285,20 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) {
|
||||
// Periodic maintenance work to keep important DHT nodes alive.
|
||||
func (t *dht) doMaintenance() {
|
||||
now := time.Now()
|
||||
newReqs := make(map[boxPubKey]map[NodeID]time.Time, len(t.reqs))
|
||||
for key, dests := range t.reqs {
|
||||
newDests := make(map[NodeID]time.Time, len(dests))
|
||||
for nodeID, start := range dests {
|
||||
if now.Sub(start) > 6*time.Second {
|
||||
continue
|
||||
}
|
||||
newDests[nodeID] = start
|
||||
}
|
||||
if len(newDests) > 0 {
|
||||
newReqs[key] = newDests
|
||||
newReqs := make(map[dhtReqKey]time.Time, len(t.reqs))
|
||||
for key, start := range t.reqs {
|
||||
if now.Sub(start) < 6*time.Second {
|
||||
newReqs[key] = start
|
||||
}
|
||||
}
|
||||
t.reqs = newReqs
|
||||
newCallbacks := make(map[dhtReqKey]dht_callbackInfo, len(t.callbacks))
|
||||
for key, callback := range t.callbacks {
|
||||
if now.Before(callback.time) {
|
||||
newCallbacks[key] = callback
|
||||
}
|
||||
}
|
||||
t.callbacks = newCallbacks
|
||||
for infoID, info := range t.table {
|
||||
if now.Sub(info.recv) > time.Minute || info.pings > 3 {
|
||||
delete(t.table, infoID)
|
||||
@ -349,6 +361,9 @@ func (t *dht) getImportant() []*dhtInfo {
|
||||
|
||||
// Returns true if this is a node we need to keep track of for the DHT to work.
|
||||
func (t *dht) isImportant(ninfo *dhtInfo) bool {
|
||||
if ninfo.key == t.core.boxPub {
|
||||
return false
|
||||
}
|
||||
important := t.getImportant()
|
||||
// Check if ninfo is of equal or greater importance to what we already know
|
||||
loc := t.core.switchTable.getLocator()
|
||||
|
@ -132,6 +132,8 @@ func (s *searches) doSearchStep(sinfo *searchInfo) {
|
||||
// Send to the next search target
|
||||
var next *dhtInfo
|
||||
next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:]
|
||||
rq := dhtReqKey{next.key, sinfo.dest}
|
||||
s.core.dht.addCallback(&rq, s.handleDHTRes)
|
||||
s.core.dht.ping(next, &sinfo.dest)
|
||||
}
|
||||
}
|
||||
|
@ -107,7 +107,7 @@ func main() {
|
||||
switch strings.ToLower(req["request"].(string)) {
|
||||
case "dot":
|
||||
fmt.Println(res["dot"])
|
||||
case "help", "getpeers", "getswitchpeers", "getdht", "getsessions":
|
||||
case "help", "getpeers", "getswitchpeers", "getdht", "getsessions", "dhtping":
|
||||
maxWidths := make(map[string]int)
|
||||
var keyOrder []string
|
||||
keysOrdered := false
|
||||
|
Loading…
Reference in New Issue
Block a user